package com.freedom.monitor.myeye.storage.utils;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

import com.freedom.monitor.myeye.commmon.utils.StringUtils;
import com.freedom.rpc.thrift.common.utils.Logger;

public class HBaseUtils {
	private static final Logger logger = Logger.getLogger(HBaseUtils.class);
	private static Connection GLOBAL_CONNECTION = null;
	//
	static {
		try {
			// 参考:http://m.blog.csdn.net/article/details?id=51254508
			// 1)设置全局配置文件
			logger.info("------------------------------------------------------");
			logger.info("try to set HBase config file");
			Configuration conf = HBaseConfiguration.create();
			conf.set(ConstantUtils.CLIENT_PORT, PropertyUtils.getInstance().getProperty(ConstantUtils.CLIENT_PORT));
			conf.set(ConstantUtils.QUORUM, PropertyUtils.getInstance().getProperty(ConstantUtils.QUORUM));
			conf.set(ConstantUtils.ZNODE, PropertyUtils.getInstance().getProperty(ConstantUtils.ZNODE));
			conf.set(ConstantUtils.RETRY, PropertyUtils.getInstance().getProperty(ConstantUtils.RETRY));
			conf.set(ConstantUtils.RPC_TIMEOUT, PropertyUtils.getInstance().getProperty(ConstantUtils.RPC_TIMEOUT));
			conf.set(ConstantUtils.SHORT_OPERATION_TIMEOUT,
					PropertyUtils.getInstance().getProperty(ConstantUtils.SHORT_OPERATION_TIMEOUT));
			logger.info("succeed to create HBase config file ---> " + conf);
			logger.info("------------------------------------------------------");
			// 2)创建连接
			GLOBAL_CONNECTION = ConnectionFactory.createConnection(conf);
			logger.info("connection ---> " + GLOBAL_CONNECTION.getClass().getName());
			logger.info("------------------------------------------------------");
			// 3)创建表格,为了记录最真实的情况
			if (false == createTable(ConstantUtils.TABLE_SERVICE)) {// 服务表
				throw new Exception("fail to create table ---> " + ConstantUtils.TABLE_SERVICE);
			}
			if (false == createTable(ConstantUtils.TABLE_SERVER_ID)) {// serverID
				throw new Exception("fail to create table ---> " + ConstantUtils.TABLE_SERVER_ID);
			}
			if (false == createTable(ConstantUtils.TABLE_SUB_KEY)) {// subKey
				throw new Exception("fail to create table ---> " + ConstantUtils.TABLE_SUB_KEY);
			}
			if (false == createTable(ConstantUtils.TABLE_DATA)) {// 关于这个Key的基础数据
				throw new Exception("fail to create table ---> " + ConstantUtils.TABLE_DATA);
			}
		} catch (Exception e) {
			logger.error(e.toString());
			System.exit(-1);
		}

	}

	private static boolean createTable(String tableName) {
		HBaseAdmin admin = null;
		try {
			admin = (HBaseAdmin) GLOBAL_CONNECTION.getAdmin();
			logger.info("admin ---> " + admin.getClass().getName());
			if (admin.tableExists(TableName.valueOf(tableName))) {
				logger.info("table [" + tableName + "] already exists");
			} else {
				logger.info("Table [" + tableName + "] not exist,try to create...");
				HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
				tableDescriptor.addFamily(new HColumnDescriptor(ConstantUtils.FAMILY1));
				admin.createTable(tableDescriptor);
				logger.info("succeed to create table [" + tableName + "]");
			}
			logger.info("succeed to ensure table [" + tableName + "] exists");
			return true;
		} catch (TableExistsException te) {
			logger.info("table [" + tableName + "] already exists by " + te.toString());
			return true;
		} catch (Exception e) {
			logger.error(e.toString());
			return false;
		} finally {
			if (null != admin) {
				try {
					admin.close();
				} catch (Exception e) {
					logger.error(e.toString());
				}
			}
		}

	}

	public static boolean insertIntoTable(String tableName, List<Put> puts) {
		if (null == puts || puts.size() <= 0) {
			return true;
		}
		Table table = null;
		try {
			table = GLOBAL_CONNECTION.getTable(TableName.valueOf(tableName));
			table.put(puts);
			logger.debug("succeed to insert Into Table ---> " + tableName);
			return true;
		} catch (Exception e) {
			logger.error(e.toString());
			logger.error("fail to insert Into Table ---> " + tableName);
			return false;
		} finally {
			if (null != table) {
				try {
					table.close();// 必须释放HBase资源
				} catch (Exception e) {
				}
			}
		}

	}

	// 参考:https://zhidao.baidu.com/question/1111977222203709219.html
	// http://blog.csdn.net/cnweike/article/details/42920547
	// 在合适的场景下使用,你需要知道自己在做什么
	public static Set<String> queryTips(String tableName, String prefix) throws Exception {
		// 1)先获取Table
		Set<String> serviceSet = new HashSet<String>();
		Table table = null;
		ResultScanner results = null;
		try {
			table = GLOBAL_CONNECTION.getTable(TableName.valueOf(tableName));
			// 2)构造Scanner
			Scan scan = new Scan();
			scan.setStartRow(Bytes.toBytes(prefix.concat(StringUtils.GLOBAL_MAGIC_KEY)));
			scan.setStopRow(Bytes.toBytes(prefix.concat(StringUtils.GLOBAL_MAGIC_KEY).concat("~")));
			//
			FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
			filterList.addFilter(new KeyOnlyFilter());// 不需要值
			scan.setFilter(filterList);
			//
			scan.setCaching(1000);
			scan.setCacheBlocks(false);
			//
			results = table.getScanner(scan);
			//
			String rowKey;
			String[] strArray;
			for (Result result : results) {
				rowKey = Bytes.toString(result.getRow());// 主键
				if (null == rowKey || rowKey.length() <= 0) {
					continue;
				}
				strArray = rowKey.split(StringUtils.GLOBAL_MAGIC_KEY);
				if (null == strArray) {
					continue;
				}
				serviceSet.add(strArray[strArray.length - 1]);// 取最后1个
			}
		} catch (Exception e) {
			throw e;
		} finally {
			// 务必要释放资源
			if (null != results) {
				results.close();
			}
			if (null != table) {
				table.close();
			}
		}
		return serviceSet;
	}

	public static void main(String[] args) {
		try {
			queryTips(ConstantUtils.TABLE_SERVICE, "yourDefaultProduct");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
